package org.example.realtime.jtp.dim.function;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RichMapFunction;

import java.util.HashMap;
import java.util.Map;
/**
 * @Title: HbaseDimMapFunction
 * @Author Lianzy
 * @Package org.example.realtime.jtp.dim.function
 * @Date 2025/6/2 21:08
 * @description 对维度表CDC数据，增加写入HBase表数据对应字段信息：表名称、列簇名称、RowKey值对应字段名称等
 */
public class HbaseDimMapFunction extends RichMapFunction<String, String>{
    //维度表名称与主键字段映射集合
    private  Map<String, String> dimMap ;
    public HbaseDimMapFunction(HashMap<String, String> dimMap){
        this.dimMap = dimMap;
    }
    @Override
    public String map(String value) throws Exception {
        /*
          {
             "operate_type": "insert",
             "db_name": "jtp_mall",
             "operate_data": {
                "dic_code": "1306",
                "create_time": "2025-01-11 11:30:49.0",
                "parent_code": "13",
                "dic_name": "不想买了",
                "operate_time": "2025-01-11 11:30:49.0"
             },
             "table_name": "base_dic",
             "ts_ms": 1737688970209
          }

          put 'dim_base_dic', '1001', 'info:dic_code', '10'
          |
          表名称、rowKey值（主键字段名称）、列簇名称、列名称、列对应值
        */
        //解析json
        JSONObject jsonObject = JSON.parseObject(value);
        //添加字段:hbase表名称
        jsonObject.put("hbase_table_name", "dim_" + jsonObject.getString("table_name"));
        //添加字段:列簇名称
        jsonObject.put("family_name", "info");
        //添加字段:rowKey值对应字段名称
        jsonObject.put("row_key_column", dimMap.get(jsonObject.getString("table_name")));
        //返回json
        return jsonObject.toJSONString();
        /*
          {
             "operate_type": "insert",
             "db_name": "jtp_mall",
             "hbase_table_name": "dim_base_dic",
             "operate_data": {
                "dic_code": "0701",
                "create_time": "2025-01-11 11:30:49.0",
                "parent_code": "07",
                "dic_name": "商家审核中",
                "operate_time": "2025-01-11 11:30:49.0"
             },
             "table_name": "base_dic",
             "family_name": "info",
             "ts_ms": 1738735567455,
             "row_key_column": "dic_code"
          }
          |
          insert 类型json字符串转换 put插入语句
          put ${hbase_table_name}, ${operate_data}.${row_key_column}, ${family_name}:${字段名称}, ${字段值}
        */
    }
}
